作者:陈广
日期:2018-4-1
上篇文章我们讲了 Socket 异步编程 SAEA 模式的使用方法,那篇文章仅仅是帮助你理解 SAEA 侦听、数据接收和发送的流程,并非正确使用方法。本文深入讨论如何使用 SAEA
SAEA 面向的是高性能,高并发网络服务器,因此它将资源的循环利用做到了极致。上篇文章我们写的代码有三处可改进的地方:
SocketAsyncEventArgs
对象的共用。当服务器侦听到一个连接后,便会创建一个新的SocketAsyncEventArgs
对象,然后将此对象传递给数据接收和发送进程,这是我们上篇文章已经实现了的。但是,数据的接收和发送进程亦可以共享这一个SocketAsyncEventArgs
对象。这样就实现所一个连接的收发操作共享一个SocketAsyncEventArgs
对象。SocketAsyncEventArgs
对象的重用。很多时候,客户端和服务器并非保持长久连接,而是连接一次,数据交换,然后断开,需要再次连接时则重复这一过程,典型如 HTTP。当服务器每天有大量的这种瞬时访问时,则会产生无数SocketAsyncEventArgs
对象的碎片,这显然会增加服务器的负担。解决方案是建立 SAEA 对象池,当一个SocketAsyncEventArgs
对象使用完毕后,将它扔进池里,下次有连接需要使用时则从池里捞一个来用。new
一个长度为1024的字节数组作为接收缓存,用完就扔,发送也一样。这样的设计在服务器瞬时访问量高时肯定是会有问题的。解决方案是分配一个大块的内存做为缓存池,供所有的 SAEA 使用。一个 SAEA 在创建时从池中指定一块区域作为缓存,用完后,将它归还池子,以便下次有新的 SAEA 创建时可以重新利用这块内存区域,以防止内存碎片化。本来只需要一个栈就可以直接存储并循环使用 SAEA 对象,可以直接使用,但考虑到栈不支持多线程,使用时需要上锁,所以需要包装一层。
在服务器程序中新建一个SocketAsyncEventArgsPool
类,使用如下命名空间:
using System;
using System.Collections.Generic;
using System.Net.Sockets;
代码如下:
internal class SocketAsyncEventArgsPool
{
Stack<SocketAsyncEventArgs> saeaPool;//saea池,使用栈存储
internal SocketAsyncEventArgsPool(int capacity)
{
saeaPool = new Stack<SocketAsyncEventArgs>(capacity);
}
internal void Push(SocketAsyncEventArgs item)
{
if (item == null)
{
throw new ArgumentNullException("传入的SocketAsyncEventArgs对象不能为空!");
}
lock (saeaPool)
{ //压入一个SocketAsyncEventArgs
saeaPool.Push(item);
}
}
internal SocketAsyncEventArgs Pop()
{
lock (saeaPool)
{ //取出一个SocketAsyncEventArgs
return saeaPool.Pop();
}
}
internal int Count
{
get { return saeaPool.Count; }
}
}
服务器在初始化时,将创建好的 SAEA Push
进池里。当连接需要使用 SAEA 时从池里Pop
一个,用完再Push
进去。从而实现循环利用。
在服务器程序中新建一个BufferManager
类,使用如下命名空间:
using System;
using System.Collections.Generic;
using System.Net.Sockets;
代码如下:
class BufferManager
{
int totalSize; //缓存池总长度(字节为单位)
byte[] bufferBlock; //缓存池所在内存空间
Stack<int> IndexPool; //此栈记录缓存池中处于回收状态的缓存
int usedIndex;//缓存池从最小索引值开始使用,此变量记录曾经使用到的最大值
int buffSize;//单个缓存的长度(字节为单位)
public BufferManager(int totalSize, int buffSize)
{
this.totalSize = totalSize;
usedIndex = 0;
this.buffSize = buffSize;
IndexPool = new Stack<int>();
}
//初始化缓存池
internal void InitBuffer()
{
bufferBlock = new byte[totalSize];
}
//为作为参数传递进来的saes划分缓存空间
internal bool SetBuffer(SocketAsyncEventArgs saea)
{
if (IndexPool.Count > 0) //如果存在处于回收状态的缓存
{ //从栈中取出缓存地址并赋予saea
saea.SetBuffer(bufferBlock, IndexPool.Pop(), buffSize);
}
else //没有处于回收状态的缓存
{ //如果缓存池空间不够则返回false
if ((totalSize - buffSize) < usedIndex)
{
return false;
}
saea.SetBuffer(bufferBlock, usedIndex, buffSize);//分配缓存池中的新空间
usedIndex += buffSize;//指定缓存池中新空间和旧空间的分界点
}
return true;
}
//释放saea所使用的缓存空间
internal void FreeBuffer(SocketAsyncEventArgs saea)
{
IndexPool.Push(saea.Offset);//将saea中用完的缓存地址压入栈中
saea.SetBuffer(null, 0, 0);
}
}
这也算是一个小小的数据结构吧,微软实现,精简、漂亮。它划分了一块大的内存空间(大小为totalSize
)给所有 SAEA 共同使用。这块内存空间会被划分为一个个小块,每个 SAEA 使用一块,需要注意的是这些小块空间必须容量相同(大小为buffSize
),这也是此数据结构能够这样实现的前提。在 Socket 连接释放时,将相应 SAEA 所使用的小块空间释放回缓存池,以便新的 SAEA 再次使用。
这段代码已经注释得很清楚了,下面画张图演示它的动作过程吧,以方便各位理解。
首先是缓存池中的三块缓存依次被使用
接下来还回第一块缓存和第二块缓存,它们的地址先后入栈。当新 SAEA 申请使用缓存时,从栈中Pop
出地址25,将其对应的第二块缓存分配给新 SAEA 使用。
准备工作完毕,下面可以开始改服务器程序了。这一块微软的示例程序好象是有点问题的,花了精力写了一个先进的网络编程模型,却没有写相应文档,只有API帮助,而且示例程序还有问题。真不知道微软在想啥。只能自己改了。
命名空间:
using System;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
Main()方法:
static Socket listener;
static BufferManager bm;//缓存池
static SocketAsyncEventArgsPool saeaPool;//saea对象池
static int connectCount = 20; //最大连接数
static int bufferSize = 25; //缓存大小,25个字节
static SemaphoreSlim acceptLimit;//用于控制同时访问线程数的信号量
static void Main(string[] args)
{
//初始化缓存池
bm = new BufferManager(connectCount * bufferSize, bufferSize);//缓存池
bm.InitBuffer();
//初始化saea对象池,将100个设置好的saea加入对象池
saeaPool = new SocketAsyncEventArgsPool(connectCount);//saea对象池
SocketAsyncEventArgs saea;
for (int i = 0; i < connectCount; i++)
{
saea = new SocketAsyncEventArgs();
saea.Completed += new EventHandler<SocketAsyncEventArgs>(OnIOCompleted);//给新建saea对象指定事件方法
bm.SetBuffer(saea);//给saea分配缓存
saeaPool.Push(saea);//将saea压入saea对象池
}
acceptLimit = new SemaphoreSlim(connectCount, connectCount);//用于控制同时访问线程数的信号量
IPAddress ip = IPAddress.Parse("127.0.0.1");
IPEndPoint point = new IPEndPoint(ip, 5000);
listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
try
{
listener.Bind(point);
listener.Listen(connectCount);
SocketAsyncEventArgs acceptEventArg = new SocketAsyncEventArgs();//所有侦听共用此saea
acceptEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(OnAccept);
StartAccept(acceptEventArg);//开始第一个侦听周期
Console.WriteLine("开始侦听...");
}
catch (Exception e)
{
Console.WriteLine(e.Message);
}
Console.ReadLine();
}
接收模块:
//开始接收,将AcceptAsync包装在此方法内
static void StartAccept(SocketAsyncEventArgs e)
{
acceptLimit.Wait();
if (!listener.AcceptAsync(e))//异步侦听连接
{
OnAccept(null, e);
}
}
//收到连接后触发的事件
static void OnAccept(object sender, SocketAsyncEventArgs e)
{
Socket handler = e.AcceptSocket;
Console.WriteLine($"侦听到来自{handler.RemoteEndPoint.ToString()}的连接请求");
//为新连接开启一个异步发送消息进程
SocketAsyncEventArgs saea = saeaPool.Pop();//从saea对象池获取一个SocketAsyncEventArgs
bm.SetBuffer(saea);//重新设置缓存
saea.AcceptSocket = e.AcceptSocket;//此处的AcceptSocket参数仅用于传递发送Socket
StartReceive(saea);
e.AcceptSocket = null;//为重复利用e,必须使用此句代码
StartAccept(e);//进入下一个侦听周期,注意,参数e是上一个StartAccept传递过来的
}
接收和发送共享事件方法:
static void OnIOCompleted(object sender, SocketAsyncEventArgs e)
{ //根据不同的动作分配不同的操作
switch (e.LastOperation)
{
case SocketAsyncOperation.Receive: //为接收操作时
ProcessReceive(e);
break;
case SocketAsyncOperation.Send: //为发送操作时
ProcessSend(e);
break;
default:
throw new ArgumentException("OnIOCompleted事件错误!");
}
}
这里是跟上篇文章中最大的不同,接收和发送共享一个事件,通过判断 Socket 的最后一个动作来决定分配处理方法。
接收消息模块:
//开始读取消息,将ReceiveAsync包装在此方法内
static void StartReceive(SocketAsyncEventArgs e)
{
if (!e.AcceptSocket.ReceiveAsync(e))//返回true,则会触发OnRead事件进行异步读取
{ //返回false则同步读取
ProcessReceive(e);
}
}
//收到消息后的处理方法
static void ProcessReceive(SocketAsyncEventArgs e)
{
if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success)
{
string sendStr = Encoding.Unicode.GetString(e.Buffer, e.Offset, e.BytesTransferred);
Console.WriteLine($"收到来自{e.AcceptSocket.RemoteEndPoint.ToString()}的信息:{sendStr}");
//将收到的消息回发
e.SetBuffer(e.Offset, e.BytesTransferred);
StartSend(e);
}
else
{
CloseSocket(e);
}
}
发送消息模块:
//开始读取消息,将ReceiveAsync包装在此方法内
static void StartSend(SocketAsyncEventArgs e)
{
if (!e.AcceptSocket.SendAsync(e))//返回true,则会触发OnSend事件进行异步读取
{ //返回false则同步发送
ProcessSend(e);
}
}
//收到消息后触发的事件
static void ProcessSend(SocketAsyncEventArgs e)
{
Socket s = e.AcceptSocket;
if (e.SocketError == SocketError.Success)
{
e.SetBuffer(0, bufferSize); //将offset指针指回开始处
StartReceive(e);//进入下一个接收周期
}
else
{
CloseSocket(e);
}
}
关闭消息模块:
static void CloseSocket(SocketAsyncEventArgs e)
{
Socket s = e.AcceptSocket;
string epStr = s.RemoteEndPoint.ToString();
try
{
s.Shutdown(SocketShutdown.Send);
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
s.Close();
acceptLimit.Release();//释放信号量
bm.FreeBuffer(e); //释放缓存
saeaPool.Push(e); //将saea回收至对象池
Console.WriteLine($"已关闭{epStr}连接");
}
整个服务器的逻辑是:侦听连接–>收到连接后创建 Socket --> 进入异步接收消息状态 --> 收到消息后将消息回发 --> 进入异步发送状态 --> 发送完毕后,继续进入异步接收消息状态。服务器的最大连接数设置为20,缓存大小设置为25个字节。
怎么少事怎么来,继续用 Thread。代码如下:
const int SEND_TIME= 10;
static void Main(string[] args)
{
//获取服务器端IP地址
IPAddress ip = IPAddress.Parse("127.0.0.1");
try
{
for (int i = 1; i <= 50; i++)
{
Socket s = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
s.Connect(ip, 5000); //向服务器发起连接
Console.WriteLine("开始连接服务器 {0} ...", ip.ToString());
if (s.Connected)
{
Console.WriteLine("连接成功!");
}
//发送消息线程
Thread tSend = new Thread(() => SendMessage(i, s));
tSend.IsBackground = true;
tSend.Start();
//接收消息线程
Thread tRecv = new Thread(() => ReceiveMessage(i, s));
tRecv.IsBackground = true;
tRecv.Start();
Thread.Sleep(1000);
}
}
catch (Exception e)
{
Console.WriteLine(e.Message);
}
Console.ReadLine();
}
//发送消息线程方法
static void SendMessage(int threadId, Socket s)
{
for (int i = 1; i <= SEND_TIME; i++)
{
string sendStr = $"线程{threadId}-消息{i}";
byte[] sendBuff = Encoding.Unicode.GetBytes(sendStr);
s.Send(sendBuff, sendBuff.Length, SocketFlags.None);
Thread.Sleep(500);
}
}
//接收消息线程方法
static void ReceiveMessage(int threadId, Socket s)
{
byte[] recvBuff = new byte[25];
try
{
int i = 0;
while (i < SEND_TIME)
{
int count = s.Receive(recvBuff, recvBuff.Length, SocketFlags.None);
string recvStr = Encoding.Unicode.GetString(recvBuff, 0, count);
Console.WriteLine($"线程{threadId}收到服务器信息:{recvStr}");
i++;
}
}
catch (Exception e)
{
Console.WriteLine(e.Message);
}
finally
{
s.Close();
Console.WriteLine($"线程{threadId}退出连接。");
}
}
客户端每隔 1 秒开一条连接,共开50条连接。每条连接每隔半秒向服务器发送一条消息,发完 5 条后结束,在收到服务器应答的 5 条消息后关闭连接。
运行效果:
这回我们使用之前《Socket多线程编程》这篇文章中的【控制台版聊天程序】这一节中的客户端代码:
static void Main(string[] args)
{
//获取服务器端IP地址
IPAddress ip = IPAddress.Parse("127.0.0.1");
try
{
Socket s = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
s.Connect(ip, 5000); //向服务器发起连接
Console.WriteLine("开始连接服务器 {0} ...", ip.ToString());
if (s.Connected)
{
Console.WriteLine("连接成功!");
}
//发送消息线程
Thread tSend = new Thread(() => SendMessage(s));
tSend.Start();
//接收消息线程
Thread tRecv = new Thread(() => ReceiveMessage(s));
tRecv.Start();
}
catch (Exception e)
{
Console.WriteLine(e.Message);
}
}
//发送消息线程方法
static void SendMessage(Socket s)
{
while (true)
{
string sendStr = Console.ReadLine();
byte[] sendBuff = Encoding.Unicode.GetBytes(sendStr);
s.Send(sendBuff, sendBuff.Length, SocketFlags.None);
}
}
//接收消息线程方法
static void ReceiveMessage(Socket s)
{
byte[] recvBuff = new byte[1024];
try
{
while (true)
{
int count = s.Receive(recvBuff, recvBuff.Length, SocketFlags.None);
string recvStr = Encoding.Unicode.GetString(recvBuff, 0, count);
Console.WriteLine("收到服务器信息:{0}", recvStr);
}
}
catch (Exception e)
{
Console.WriteLine(e.Message);
}
finally
{
s.Close();
Console.WriteLine("服务器已经退出连接。");
}
}
使用这个程序连接服务器,得到如下结果:
为什么会出现乱码呢?这是因为缓存长度为25,我们发送的信息长度超过了25,被截断分开发送。而我们以 Unicode 进行发送,Unicode 格式的每个编码占2个字节,这时第13个字每的编码的上半部份第一次发送,而下半部分到了第二次发送,从而导致整个编码错乱。所以切记,如果发送格式每字符占两个字节,缓存长度必须为偶数。我们将服务器缓存长度设置为26,再运行程序就没问题了。更改服务器代码:
static int bufferSize = 26; //缓存大小,26个字节
运行结果:
另外就是信息过长被截断的问题,解决办法第一是加长接收缓存长度,但长度设得太长,整个缓存池会占用大量内存。这个缓存长度需要根据实际情况来设置。第二是在客户端限定发送的信息长度。第三是手动分界,在服务端拼接信息。拼接信息这块我在之前的文章已经演示过,这里就不再多讲了。这样,整个 SAEA 模式我们也讲完了。需要注意的是这次的服务器的使用场景是一应一答的模式,可以接收和发送共享缓存和 saea ,其它情况下不一定行得能,这点是需要注意的。